{
 "cells": [
  {
   "cell_type": "code",
   "execution_count": 1,
   "metadata": {},
   "outputs": [],
   "source": [
    "# Copyright 2019 The Kubeflow Authors. All Rights Reserved.\n",
    "#\n",
    "# Licensed under the Apache License, Version 2.0 (the \"License\");\n",
    "# you may not use this file except in compliance with the License.\n",
    "# You may obtain a copy of the License at\n",
    "#\n",
    "#     http://www.apache.org/licenses/LICENSE-2.0\n",
    "#\n",
    "# Unless required by applicable law or agreed to in writing, software\n",
    "# distributed under the License is distributed on an \"AS IS\" BASIS,\n",
    "# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\n",
    "# See the License for the specific language governing permissions and\n",
    "# limitations under the License."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# KubeFlow Pipeline local development quickstart\n",
    "\n",
    "In this notebook, we will demo: \n",
    "\n",
    "* Author components with the lightweight method and ContainerOp based on existing images.\n",
    "* Author pipelines.\n",
    "\n",
    "**Note: Make sure that you have docker installed in the local environment**"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Setup"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 2,
   "metadata": {
    "tags": [
     "parameters"
    ]
   },
   "outputs": [],
   "source": [
    "# PROJECT_ID is used to construct the docker image registry. We will use Google Container Registry, \n",
    "# but any other accessible registry works as well. \n",
    "PROJECT_ID='Your-Gcp-Project-Id'"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# Install Pipeline SDK\n",
    "!pip3 install kfp --upgrade\n",
    "!mkdir -p tmp/pipelines"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Part 1\n",
    "# Two ways to author a component to list blobs in a GCS bucket\n",
    "A pipeline is composed of one or more components. In this section, you will build a single component that lists the blobs in a GCS bucket. Then you build a pipeline that consists of this component. There are two ways to author a component. In the following sections we will go through each of them."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## 1. Create a lightweight python component from a Python function."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### 1.1 Define component function\n",
    "The requirements for the component function:\n",
    "* The function must be stand-alone.\n",
    "* The function can only import packages that are available in the base image.\n",
    "* If the function operates on numbers, the parameters must have type hints. Supported types are `int`, `float`, `bool`. Everything else is passed as `str`, that is, string.\n",
    "* To build a component with multiple output values, use Python’s `typing.NamedTuple` type hint syntax."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 4,
   "metadata": {},
   "outputs": [],
   "source": [
    "def list_blobs(bucket_name: str) -> str:\n",
    "  '''Lists all the blobs in the bucket.'''\n",
    "  import subprocess\n",
    "\n",
    "  subprocess.call(['pip', 'install', '--upgrade', 'google-cloud-storage'])\n",
    "  from google.cloud import storage\n",
    "  storage_client = storage.Client()\n",
    "  bucket = storage_client.get_bucket(bucket_name)\n",
    "  list_blobs_response = bucket.list_blobs()\n",
    "  blobs = ','.join([blob.name for blob in list_blobs_response])\n",
    "  print(blobs)\n",
    "  return blobs"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### 1.2 Create a lightweight Python component"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 5,
   "metadata": {},
   "outputs": [],
   "source": [
    "import kfp.components as comp\n",
    "\n",
    "# Converts the function to a lightweight Python component.\n",
    "list_blobs_op = comp.func_to_container_op(list_blobs)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### 1.3 Define pipeline\n",
    "Note that when accessing google cloud file system, you need to make sure the pipeline can authenticate to GCP. Refer to [Authenticating Pipelines to GCP](https://www.kubeflow.org/docs/gke/authentication-pipelines/) for details."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 7,
   "metadata": {},
   "outputs": [],
   "source": [
    "import kfp.dsl as dsl\n",
    "\n",
    "# Defines the pipeline.\n",
    "@dsl.pipeline(name='List GCS blobs', description='Lists GCS blobs.')\n",
    "def pipeline_func(bucket_name):\n",
    "  list_blobs_task = list_blobs_op(bucket_name)\n",
    "  # Use the following commented code instead if you want to use GSA key for authentication.\n",
    "  #\n",
    "  # from kfp.gcp import use_gcp_secret\n",
    "  # list_blobs_task = list_blobs_op(bucket_name).apply(use_gcp_secret('user-gcp-sa'))\n",
    "  # Same for below.",
    "\n",
    "# Compile the pipeline to a file.\n",
    "import kfp.compiler as compiler\n",
    "compiler.Compiler().compile(pipeline_func, 'tmp/pipelines/list_blobs.pipeline.tar.gz')"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## 2. Wrap an existing Docker container image using `ContainerOp`"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### 2.1 Create a Docker container\n",
    "Create your own container image that includes your program. If your component creates some outputs to be fed as inputs to the downstream components, each separate output must be written as a string to a separate local text file inside the container image. For example, if a trainer component needs to output the trained model path, it can write the path to a local file `/output.txt`. The string written to an output file cannot be too big. If it is too big (>> 100 kB), it is recommended to save the output to an external persistent storage and pass the storage path to the next component.\n",
    "\n",
    "Start by entering the value of your Google Cloud Platform Project ID."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "The following cell creates a file `app.py` that contains a Python script. The script takes a GCS bucket name as an input argument, gets the lists of blobs in that bucket, prints the list of blobs and also writes them to an output file."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 8,
   "metadata": {},
   "outputs": [],
   "source": [
    "%%bash\n",
    "\n",
    "# Create folders if they don't exist.\n",
    "mkdir -p tmp/components/list-gcs-blobs\n",
    "\n",
    "# Create the Python file that lists GCS blobs.\n",
    "cat > ./tmp/components/list-gcs-blobs/app.py <<HERE\n",
    "import argparse\n",
    "from google.cloud import storage\n",
    "# Parse agruments.\n",
    "parser = argparse.ArgumentParser()\n",
    "parser.add_argument(\n",
    "    '--bucket', type=str, required=True, help='GCS bucket name.')\n",
    "args = parser.parse_args()\n",
    "# Create a client.\n",
    "storage_client = storage.Client()\n",
    "# List blobs.\n",
    "bucket = storage_client.get_bucket(args.bucket)\n",
    "list_blobs_response = bucket.list_blobs()\n",
    "blobs = ','.join([blob.name for blob in list_blobs_response])\n",
    "print(blobs)\n",
    "with open('/blobs.txt', 'w') as f:\n",
    "  f.write(blobs)\n",
    "HERE"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Now create a container that runs the script. Start by creating a `Dockerfile`. A `Dockerfile` contains the instructions to assemble a Docker image. The `FROM` statement specifies the Base Image from which you are building. `WORKDIR` sets the working directory. When you assemble the Docker image, `COPY` will copy the required files and directories (for example, `app.py`) to the filesystem of the container. `RUN` will execute a command (for example, install the dependencies) and commits the results. "
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 9,
   "metadata": {},
   "outputs": [],
   "source": [
    "%%bash\n",
    "\n",
    "# Create Dockerfile.\n",
    "cat > ./tmp/components/list-gcs-blobs/Dockerfile <<EOF\n",
    "FROM python:3.6-slim\n",
    "WORKDIR /app\n",
    "COPY . /app\n",
    "RUN pip install --upgrade google-cloud-storage\n",
    "EOF"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Now that we have created our Dockerfile we can create our Docker image. Then we need to push the image to a registry to host the image. Now create a Shell script that builds a container image and stores it in the Google Container Registry."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 10,
   "metadata": {},
   "outputs": [],
   "source": [
    "%%bash -s \"{PROJECT_ID}\"\n",
    "\n",
    "IMAGE_NAME=\"listgcsblobs\"\n",
    "TAG=\"latest\" # \"v_$(date +%Y%m%d_%H%M%S)\"\n",
    "\n",
    "# Create script to build docker image and push it.\n",
    "cat > ./tmp/components/list-gcs-blobs/build_image.sh <<HERE\n",
    "PROJECT_ID=\"${1}\"\n",
    "IMAGE_NAME=\"${IMAGE_NAME}\"\n",
    "TAG=\"${TAG}\"\n",
    "GCR_IMAGE=\"gcr.io/\\${PROJECT_ID}/\\${IMAGE_NAME}:\\${TAG}\"\n",
    "docker build -t \\${IMAGE_NAME} .\n",
    "docker tag \\${IMAGE_NAME} \\${GCR_IMAGE}\n",
    "docker push \\${GCR_IMAGE}\n",
    "docker image rm \\${IMAGE_NAME}\n",
    "docker image rm \\${GCR_IMAGE}\n",
    "HERE"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Run the script."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "%%bash\n",
    "\n",
    "# Build and push the image.\n",
    "cd tmp/components/list-gcs-blobs\n",
    "bash build_image.sh"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### 2.2 Define each component\n",
    "Define a component by creating an instance of `kfp.dsl.ContainerOp` that describes the interactions with the Docker container image created in the previous step. You need to specify the component name, the image to use, the command to run after the container starts, the input arguments, and the file outputs. ."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 12,
   "metadata": {},
   "outputs": [],
   "source": [
    "import kfp.dsl\n",
    "\n",
    "def list_gcs_blobs_op(name, bucket):\n",
    "  return kfp.dsl.ContainerOp(\n",
    "      name=name,\n",
    "      image='gcr.io/{}/listgcsblobs:latest'.format(PROJECT_ID),\n",
    "      command=['python', '/app/app.py'],\n",
    "      file_outputs={'blobs': '/blobs.txt'},\n",
    "      arguments=['--bucket', bucket]\n",
    "  )"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### 2.3 Create your workflow as a Python function\n",
    "Start by creating a folder to store the pipeline file."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 13,
   "metadata": {},
   "outputs": [],
   "source": [
    "# Create folders if they don't exist.\n",
    "!mkdir -p tmp/pipelines"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Define your pipeline as a Python function. ` @kfp.dsl.pipeline` is a required decoration including `name` and `description` properties. Then compile the pipeline function. After the compilation is completed, a pipeline file is created."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 14,
   "metadata": {},
   "outputs": [],
   "source": [
    "import datetime\n",
    "import kfp.compiler as compiler\n",
    "\n",
    "# Define the pipeline\n",
    "@kfp.dsl.pipeline(\n",
    "  name='List GCS Blobs',\n",
    "  description='Takes a GCS bucket name as input and lists the blobs.'\n",
    ")\n",
    "def pipeline_func(bucket='Enter your bucket name here.'):\n",
    "  list_blobs_task = list_gcs_blobs_op('List', bucket)\n",
    "\n",
    "# Compile the pipeline to a file.\n",
    "filename = 'tmp/pipelines/list_blobs{dt:%Y%m%d_%H%M%S}.pipeline.tar.gz'.format(\n",
    "    dt=datetime.datetime.now())\n",
    "compiler.Compiler().compile(pipeline_func, filename)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Follow the [instructions](https://www.kubeflow.org/docs/other-guides/accessing-uis/) on kubeflow.org to access Kubeflow UIs. Upload the created pipeline and run it.\n",
    "\n",
    "**Warning:** When the pipeline is run, it pulls the image from the repository to the Kubernetes cluster to create a container. Kubernetes caches pulled images. One solution is to use the image digest instead of the tag in your component dsl, for example, `s/v1/sha256:9509182e27dcba6d6903fccf444dc6188709cc094a018d5dd4211573597485c9/g`. Alternatively, if you don't want to update the digest every time, you can try `:latest` tag, which will force the k8s to always pull the latest image.."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "___"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Part 2\n",
    "# Create a pipeline using Kubeflow Pipelines"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "In this section, you will build another component. Then you will see how to connect components to build a multi-component pipeline. You will build the new component by building a Docker container image and wrapping it using `ContainerOp`."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## 1 Create a container to view CSV\n",
    "Build a component that can the output of the first component explained in the preceding section (that is, the list of GCS blobs), selects a file ending in `iris.csv` and displays its content as an artifact. Start by uploading to your Storage bucket the `quickstart_iris.csv` file that is included in the repository."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "%%bash -s \"{PROJECT_ID}\"\n",
    "# Create folders if they don't exist.\n",
    "mkdir -p tmp/components/view-input\n",
    "\n",
    "\n",
    "# Create the Python file that selects and views the input CSV.\n",
    "cat > ./tmp/components/view-input/app.py <<HERE\n",
    "import argparse\n",
    "import json\n",
    "from google.cloud import storage\n",
    "# Parse agruments.\n",
    "parser = argparse.ArgumentParser()\n",
    "parser.add_argument('--blobs', type=str, required=True, help='List of blobs.')\n",
    "args = parser.parse_args()\n",
    "blobs = args.blobs.split(',')\n",
    "inputs = filter(lambda s: s.endswith('iris.csv'), blobs)\n",
    "input = list(inputs)[0]\n",
    "print('The CSV file is {}'.format(input))\n",
    "# CSV header.\n",
    "header = [\n",
    "    'sepal_length',\n",
    "    'sepal_width',\n",
    "    'petal_length',\n",
    "    'petal_width',\n",
    "    'species',\n",
    "]\n",
    "# Add a metadata for an artifact.\n",
    "metadata = {\n",
    "  'outputs' : [{\n",
    "    'type': 'table',\n",
    "    'storage': 'gcs',\n",
    "    'format': 'csv',\n",
    "    'header': header,\n",
    "    'source': input\n",
    "  }]\n",
    "}\n",
    "print(metadata)\n",
    "# Create an artifact.\n",
    "with open('/mlpipeline-ui-metadata.json', 'w') as f:\n",
    "  json.dump(metadata, f)\n",
    "HERE\n",
    "\n",
    "\n",
    "# Create Dockerfile.\n",
    "cat > ./tmp/components/view-input/Dockerfile <<HERE\n",
    "FROM python:3.6-slim\n",
    "WORKDIR /app\n",
    "COPY . /app\n",
    "RUN pip install --upgrade google-cloud-storage\n",
    "HERE\n",
    "\n",
    "\n",
    "# Create script to build docker image and push it.\n",
    "IMAGE_NAME=\"viewinput\"\n",
    "TAG=\"latest\" # \"v_$(date +%Y%m%d_%H%M%S)\"\n",
    "cat > ./tmp/components/view-input/build_image.sh <<HERE\n",
    "PROJECT_ID=\"${1}\"\n",
    "IMAGE_NAME=\"${IMAGE_NAME}\"\n",
    "TAG=\"${TAG}\"\n",
    "GCR_IMAGE=\"gcr.io/\\${PROJECT_ID}/\\${IMAGE_NAME}:\\${TAG}\"\n",
    "docker build -t \\${IMAGE_NAME} .\n",
    "docker tag \\${IMAGE_NAME} \\${GCR_IMAGE}\n",
    "docker push \\${GCR_IMAGE}\n",
    "docker image rm \\${IMAGE_NAME}\n",
    "docker image rm \\${GCR_IMAGE}\n",
    "HERE\n",
    "\n",
    "\n",
    "# Build and push the image.\n",
    "cd tmp/components/view-input\n",
    "bash build_image.sh"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## 2 Define each component\n",
    "Define each of your components by using `kfp.dsl.ContainerOp`. Decribe the interactions with the Docker container image created in the previous step by specifying the component name, the image to use, the command to run after the container starts, the input arguments, and the file outputs."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 16,
   "metadata": {},
   "outputs": [],
   "source": [
    "import kfp.dsl\n",
    "\n",
    "def list_gcs_blobs_op(name, bucket):\n",
    "  return kfp.dsl.ContainerOp(\n",
    "      name=name,\n",
    "      image='gcr.io/{}/listgcsblobs:latest'.format(PROJECT_ID),\n",
    "      command=['python', '/app/app.py'],\n",
    "      arguments=['--bucket', bucket],\n",
    "      file_outputs={'blobs': '/blobs.txt'},\n",
    "      output_artifact_paths={'mlpipeline-ui-metadata': '/mlpipeline-ui-metadata.json'},\n",
    "  )\n",
    "\n",
    "def view_input_op(name, blobs):\n",
    "  return kfp.dsl.ContainerOp(\n",
    "      name=name,\n",
    "      image='gcr.io/{}/viewinput:latest'.format(PROJECT_ID),\n",
    "      command=['python', '/app/app.py'],\n",
    "      arguments=['--blobs', blobs]\n",
    "  )"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## 3 Create your workflow as a Python function\n",
    "Define your pipeline as a Python function. ` @kfp.dsl.pipeline` is a required decoration including `name` and `description` properties. `pipeline_func` defines the pipeline with the `bucket` parameter. When the user uploads the pipeline to the system and starts creating a new run from it, they'll see the an input box for the `bucket` parameter with the initial value `Enter your bucket name here.`. You can change the initial value with your bucket name at runtime. `list_gcs_blobs_op('List', bucket)` will create a component named `List` that lists the blobs. `view_input_op('View', list_blobs_task.outputs['blobs'])` will create a component named `View` that views a CSV. `list_blobs_task.outputs['blobs']` tells the pipeline to take the output of the first component stored as string in `blobs.txt` as an input for the second component."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 17,
   "metadata": {},
   "outputs": [],
   "source": [
    "# Create folders if they don't exist.\n",
    "!mkdir -p tmp/pipelines"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 18,
   "metadata": {},
   "outputs": [],
   "source": [
    "import datetime\n",
    "import kfp.compiler as compiler\n",
    "\n",
    "# Define the pipeline\n",
    "@kfp.dsl.pipeline(\n",
    "  name='Quickstart pipeline',\n",
    "  description='Takes a GCS bucket name views a CSV input file in the bucket.'\n",
    ")\n",
    "def pipeline_func(bucket='Enter your bucket name here.'):\n",
    "  list_blobs_task = list_gcs_blobs_op('List', bucket)\n",
    "  view_input_task = view_input_op('View', list_blobs_task.outputs['blobs'])\n",
    "\n",
    "# Compile the pipeline to a file.\n",
    "filename = 'tmp/pipelines/quickstart_pipeline{dt:%Y%m%d_%H%M%S}.pipeline.tar.gz'.format(\n",
    "    dt=datetime.datetime.now())\n",
    "compiler.Compiler().compile(pipeline_func, filename)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Follow the [instructions](https://www.kubeflow.org/docs/other-guides/accessing-uis/) on kubeflow.org to access Kubeflow UIs. Upload the created pipeline and run it."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Clean up"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 19,
   "metadata": {},
   "outputs": [],
   "source": [
    "import shutil\n",
    "import pathlib\n",
    "path = pathlib.Path(\"tmp\")\n",
    "shutil.rmtree(path)"
   ]
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "Python 3",
   "language": "python",
   "name": "python3"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.7.3"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 2
}
